package cn.doitedu.rtmk.demo.demo6.groovy;

import cn.doitedu.rtmk.demo.demo5.EventBean;
import cn.doitedu.rtmk.demo.demo5.RtmkMessage;
import cn.doitedu.rtmk.demo.demo5.RuleModelCalculator;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.util.Collector;

@Slf4j
public class RuleModel002Calculator implements RuleModelCalculator {
    RuleModel002ParamBean ruleParamBean;
    ValueState<Integer> valueState;


    @Override
    public String getRuleId() {
        return ruleParamBean.getRuleId();
    }

    public void init(String ruleParamJson, RuntimeContext runtimeContext) {
        ruleParamBean = JSON.parseObject(ruleParamJson, RuleModel002ParamBean.class);
        valueState = runtimeContext.getState(new ValueStateDescriptor<Integer>("rule2_state", Integer.class));
    }


    public void calc(
            EventBean eventBean,
            Collector<RtmkMessage> out
            ) throws Exception {

        /* *
         * 一、进行规则所需要的实时画像标签的计算
         */
        // 实时画像条件：发生过 add_cart事件，且添加的商品的价格>200
        if (valueState.value() == null || valueState.value() == 0) {
            if (eventBean.getEventId().equals(ruleParamBean.getDynamicProfileEventId())
                    && Double.parseDouble(eventBean.getProperties().get(ruleParamBean.getDynamicProfilePropertyKey())) > ruleParamBean.getDynamicProfilePropertyValue()) {
                valueState.update(1);
            }
        }

        /* *
         * 二、判断是否是预定义的营销规则的触发事件 ：加购事件（加购的商品价格>100)
         */

        if (eventBean.getEventId().equals(ruleParamBean.getFireEventId())) {
            log.warn("用户:{},触发了规则：{}" ,eventBean.getUserId(),ruleParamBean.getRuleId());

            // 判断该事件的行为人是否满足营销规则中定义的判断条件
            if (valueState.value() != null && valueState.value() == 1) {
                log.warn("用户: {},时间: {} ,命中了规则: {}",eventBean.getUserId(),eventBean.getTimeStamp(),ruleParamBean.getRuleId());
                out.collect(new RtmkMessage(eventBean.getUserId(), eventBean.getTimeStamp(), ruleParamBean.getRuleId()));
            }

        }
    }
}
